11 ROS2 实践课-服务通信节点开发与调试

ROS2 服务通信节点开发与调试

关联:索引

  1. 场景提问:你要下发“急停”指令,最关键的响应信息应该是什么?

1. srv 文件的基本结构

# Request
<字段类型> <字段名>
...
---
# Response
<字段类型> <字段名>
...

服务名建议:/sorting/device/emergency_stop

# 请求字段(客户端→服务端)
string request_id
float64 timestamp
bool estop_trigger
---
# 响应字段(服务端→客户端)
bool ok
uint8 error_code
string request_id
float64 execution_time
string device_final_status
float64 response_timestamp

1. 创建工作空间与接口包(推荐接口独立成包)

source /opt/ros/humble/setup.bash
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
ros2 pkg create sorting_interfaces --build-type ament_cmake

说明:接口包选用 ament_cmake 是 ROS2 里生成 .srv/.msg/.action 的常见做法,不代表必须写 C++;服务端/客户端节点依然可以用 ament_python 开发。

建议目录:

package.xml(接口包最小可用模板,可直接替换包名/描述/维护者信息):

<?xml version="1.0"?>
<package format="3">
  <name>sorting_interfaces</name>
  <version>0.0.0</version>
  <description>Interface definitions for sorting robot</description>
  <maintainer email="[email protected]">teacher</maintainer>
  <license>Apache-2.0</license>

  <buildtool_depend>ament_cmake</buildtool_depend>

  <build_depend>rosidl_default_generators</build_depend>
  <exec_depend>rosidl_default_runtime</exec_depend>

  <member_of_group>rosidl_interface_packages</member_of_group>

  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>

如果 .srv 里使用了 builtin_interfaces/Time,在 package.xml 里额外加一行依赖:

  <depend>builtin_interfaces</depend>

CMakeLists.txt(接口包最小可用模板,把 srv 文件名填进去):

cmake_minimum_required(VERSION 3.8)
project(sorting_interfaces)

find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)

set(srv_files
  "srv/DeviceEmergencyStop.srv"
)

rosidl_generate_interfaces(${PROJECT_NAME}
  ${srv_files}
)

ament_export_dependencies(rosidl_default_runtime)
ament_package()

如果 .srv 里使用了 builtin_interfaces/Time,在 CMakeLists.txt 里补两处:

find_package(builtin_interfaces REQUIRED)

以及在 rosidl_generate_interfaces(...) 中增加:

  DEPENDENCIES builtin_interfaces

3. 编译与环境刷新

cd ~/ros2_ws
colcon build --symlink-install
source /opt/ros/humble/setup.bash
source install/setup.bash

4. 生成结果验证(必须能用命令看到接口)

查看接口是否已注册:

ros2 interface list | grep sorting_interfaces

查看 srv 具体字段:

ros2 interface show sorting_interfaces/srv/DeviceEmergencyStop

5. 小组产出(当堂提交)

  1. 快问快答:为什么自定义 srv 不编译就不能在节点里 import 使用?

1. 创建服务功能包(Python 示例)

source /opt/ros/humble/setup.bash
cd ~/ros2_ws/src
ros2 pkg create sorting_service_practice --build-type ament_python --dependencies rclpy sorting_interfaces

建议文件路径:

入口点配置(setup.py)示例结构:

entry_points={
    'console_scripts': [
        'emergency_stop_server = sorting_service_practice.emergency_stop_server:main',
        'emergency_stop_client = sorting_service_practice.emergency_stop_client:main',
    ],
},

实现代码(可直接复制):

服务端:emergency_stop_server.py

import time

import rclpy
from rclpy.node import Node

from sorting_interfaces.srv import DeviceEmergencyStop

class EmergencyStopServer(Node):
    def __init__(self) -> None:
        super().__init__('emergency_stop_server')
        self._device_status = 'RUNNING'
        self._service = self.create_service(
            DeviceEmergencyStop,
            '/sorting/device/emergency_stop',
            self._handle_emergency_stop,
        )

    def _handle_emergency_stop(
        self,
        request: DeviceEmergencyStop.Request,
        response: DeviceEmergencyStop.Response,
    ) -> DeviceEmergencyStop.Response:
        start_perf = time.perf_counter()
        response.request_id = request.request_id

        if not request.request_id or request.timestamp <= 0.0:
            response.ok = False
            response.error_code = 1
            response.device_final_status = self._device_status
            response.execution_time = time.perf_counter() - start_perf
            response.response_timestamp = time.time()
            return response

        if request.estop_trigger and self._device_status == 'STOPPED':
            response.ok = False
            response.error_code = 3
            response.device_final_status = self._device_status
            response.execution_time = time.perf_counter() - start_perf
            response.response_timestamp = time.time()
            return response

        if (not request.estop_trigger) and self._device_status == 'RUNNING':
            response.ok = False
            response.error_code = 3
            response.device_final_status = self._device_status
            response.execution_time = time.perf_counter() - start_perf
            response.response_timestamp = time.time()
            return response

        self._device_status = 'STOPPED' if request.estop_trigger else 'RUNNING'
        response.ok = True
        response.error_code = 0
        response.device_final_status = self._device_status
        response.execution_time = time.perf_counter() - start_perf
        response.response_timestamp = time.time()
        return response

def main(args=None) -> None:
    rclpy.init(args=args)
    node = EmergencyStopServer()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

客户端:emergency_stop_client.py

import time

import rclpy
from rclpy.node import Node

from sorting_interfaces.srv import DeviceEmergencyStop

def main(args=None) -> None:
    rclpy.init(args=args)
    node = Node('emergency_stop_client')

    client = node.create_client(DeviceEmergencyStop, '/sorting/device/emergency_stop')
    if not client.wait_for_service(timeout_sec=3.0):
        node.get_logger().error('service not available: /sorting/device/emergency_stop')
        node.destroy_node()
        rclpy.shutdown()
        return

    req = DeviceEmergencyStop.Request()
    req.request_id = 'r001'
    req.timestamp = time.time()
    req.estop_trigger = True

    future = client.call_async(req)
    rclpy.spin_until_future_complete(node, future, timeout_sec=3.0)

    if not future.done():
        node.get_logger().error('call timeout')
        node.destroy_node()
        rclpy.shutdown()
        return

    if future.exception() is not None:
        node.get_logger().error(f'call failed: {future.exception()}')
        node.destroy_node()
        rclpy.shutdown()
        return

    resp: DeviceEmergencyStop.Response = future.result()
    node.get_logger().info(
        f'ok={resp.ok}, error_code={resp.error_code}, request_id={resp.request_id}, '
        f'exec_time={resp.execution_time:.4f}, final_status={resp.device_final_status}, '
        f'resp_ts={resp.response_timestamp:.3f}'
    )

    node.destroy_node()
    rclpy.shutdown()

1. 编译与运行(先服务端后客户端)

cd ~/ros2_ws
colcon build --symlink-install
source /opt/ros/humble/setup.bash
source install/setup.bash

终端 A(启动服务端):

ros2 run sorting_service_practice emergency_stop_server

终端 B(查看服务与类型):

ros2 service list | grep emergency
ros2 service type /sorting/device/emergency_stop
ros2 interface show sorting_interfaces/srv/DeviceEmergencyStop
ros2 service call /sorting/device/emergency_stop sorting_interfaces/srv/DeviceEmergencyStop "{request_id: 'r001', timestamp: 1710000000.0, estop_trigger: true}"

若终端对引号较敏感,可改用单引号包裹 YAML:ros2 service call ... '{request_id: "r001", timestamp: 1710000000.0, estop_trigger: true}'

失败调用示例(重复指令,期望 ok=falseerror_code=3):

ros2 service call /sorting/device/emergency_stop sorting_interfaces/srv/DeviceEmergencyStop "{request_id: 'r002', timestamp: 1710000001.0, estop_trigger: true}"

失败调用示例(非法请求,期望 ok=falseerror_code=1):

ros2 service call /sorting/device/emergency_stop sorting_interfaces/srv/DeviceEmergencyStop "{request_id: '', timestamp: 0.0, estop_trigger: true}"

1. 接口定义/生成相关(编译时报错)

2. 找不到服务/调用超时(运行时最常见)

3. 环境/网络隔离问题(“我明明跑了但看不见”)

4. 权限/系统策略相关(多机或受限环境常见)

提示词模板(可直接复制):

你是 ROS2 Humble 的服务通信开发与调试专家。请基于以下需求输出可运行的实现与自测方法。
需求:分拣设备“急停控制”服务
- 接口规范:请先用 5 条要点总结 srv 接口设计规范(字段、错误码、幂等、可追溯、验收证据)
- 自定义 srv:请给出 .srv 内容(必须包含 request_id/timestamp/estop_trigger 与 execution_time/device_final_status/response_timestamp)
- Python rclpy:请给出服务端与客户端完整代码(关键逻辑处添加简短注释:字段校验、超时、错误码含义、execution_time 计算)
- 约束:客户端必须带超时;服务端必须对非法请求返回 ok=false 与 error_code;服务端必须原样返回 request_id
- 自测:给出 ros2 interface show、ros2 service list/type/call 的命令与期望关键输出(至少包含 1 次成功调用 + 1 次重复指令失败 + 1 次非法请求失败)
- 排错:如果出现“service not available / 找不到接口 / colcon 编译失败”,请按“现象→原因→定位命令→修复步骤→复验命令”输出
注意:不要假设任何文件已经存在;请同时给出推荐的包划分与文件路径(interfaces 包与节点包);涉及依赖必须在 package.xml / CMakeLists.txt 或 setup.py 中体现。